private val logs = new Pool[TopicAndPartition, Log]() /** * Create a log for the given topic and the given partition * If the log already exists, just return a copy of the existing log */ def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { logCreationOrDeletionLock synchronized { var log = logs.get(topicAndPartition) // check if the log has already been created in another thread if(log != null) return log // if not, create it val dataDir = nextLogDir() val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) dir.mkdirs() log = new Log(dir, config, recoveryPoint = 0L, scheduler, time) logs.put(topicAndPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." .format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath, {import JavaConversions._; config.toProps.mkString(", ")})) log } } /** * Choose the next directory in which to create a log. Currently this is done * by calculating the number of partitions in each directory and then choosing the * data directory with the fewest partitions. */ private def nextLogDir(): File = { if(logDirs.size == 1) { logDirs(0) } else { // count the number of logs in each parent directory (including 0 for empty directories val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap var dirCounts = (zeros ++ logCounts).toBuffer // choose the directory with the least logs in it val leastLoaded = dirCounts.sortBy(_._2).head new File(leastLoaded._1) } }